[Embulk] guess機能を試してみた【追記】あり
【追記】 コメントをいただきました。 Postgresqlのデータを登録する際は、default_timezoneを設定するのが良いとのことです。 そのままでは、投入されたデータの日付が9時間ずれてしまうそうです。
out: type: postgresql default_timezone: "Japan" # <-- これ
詳しくはこちらを参照ください。
ご指摘ありがとうございました!
こんにちは、最近Embulk(エンバルク)を調べている川崎です。
前回の記事はこちらをご覧ください。 [Embulk] Embulkについての個人的なまとめ
4/22(金)に早速Embulkの特集が掲載された「WEB+DB PRESS Vol.92」を入手しました。記事の内容も参考にしながら、まずはEmbulkのインストールから進めていきます。
なお、本日(4月25日(月))Treasure Dataさんのイベントが開催されますが、Embulkは今回のお題にはなっていないようです。残念。
Treasure Data Tech Talk http://eventdots.jp/event/584571
インストール
Embulkのインストールはこちらをご覧ください。 https://github.com/embulk/embulk#quick-start
curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar" chmod +x ~/.embulk/bin/embulk echo 'export PATH="$HOME/.embulk/bin:$PATH"' &gt;&gt; ~/.bashrc source ~/.bashrc
サンプル実行
Embulkに付属のexampleコマンドを実行してみます。このステップで、Embulkをデモ実行してみることが可能です。
embulk example ./try1 embulk guess ./try1/seed.yml -o config.yml embulk preview config.yml embulk run config.yml
既存のデータセットをguess機能を使って読み込む
手始めに統計の世界で有名なIRIS(あやめ)のデータセットを、guess機能を使ってデータベースに自動的に取り込ませてみます。データの中身はこのようになっています。
$ cat IRIS.csv Sepal_Length,Sepal_Width,Petal_Length,Petal_Width,Species 5.1,3.5,1.4,0.2,setosa 4.9,3.0,1.4,0.2,setosa 4.7,3.2,1.3,0.2,setosa 4.6,3.1,1.5,0.2,setosa 5.0,3.6,1.4,0.2,setosa
guess機能に対する設定を、以下のようにguess.ymlファイルに記述しておきます。(テキストエディタで作成します)
$ cat guess.yml in: type: file path_prefix: "./IRIS.csv" out: type: postgresql host: localhost user: postgres password: xxxx database: postgres table: iris mode: insert
guessコマンドを実行すると、推定した結果がconfig.ymlファイルに保存されます。
$ embulk guess ./guess.yml -o config.yml 2016-04-25 16:10:35.393 +0900: Embulk v0.8.8 2016-04-25 16:10:36.574 +0900 [INFO] (0001:guess): Listing local files at directory '.' filtering filename by prefix 'IRIS.csv' 2016-04-25 16:10:36.583 +0900 [INFO] (0001:guess): Loading files [IRIS.csv] 2016-04-25 16:10:36.637 +0900 [INFO] (0001:guess): Loaded plugin embulk/guess/gzip from a load path 2016-04-25 16:10:36.652 +0900 [INFO] (0001:guess): Loaded plugin embulk/guess/bzip2 from a load path 2016-04-25 16:10:36.667 +0900 [INFO] (0001:guess): Loaded plugin embulk/guess/json from a load path 2016-04-25 16:10:36.676 +0900 [INFO] (0001:guess): Loaded plugin embulk/guess/csv from a load path in: type: file path_prefix: ./IRIS.csv parser: charset: UTF-8 newline: CRLF type: csv delimiter: ',' quote: '"' escape: '"' trim_if_not_quoted: false skip_header_lines: 1 allow_extra_columns: false allow_optional_columns: false columns: - {name: Sepal_Length, type: double} - {name: Sepal_Width, type: double} - {name: Petal_Length, type: double} - {name: Petal_Width, type: double} - {name: Species, type: string} out: {type: postgresql, host: localhost, user: postgres, password: xxxx, database: postgres, table: iris, mode: insert} Created 'config.yml' file.
config.ymlの内容は、次のようになっています。guess.ymlに記述した内容は、そのまま引き継がれていると思います。 追加で、ロード対象がCSVであることや、そのオプション設定、カラムの設定などが保存されています。
$ cat config.yml in: type: file path_prefix: ./IRIS.csv parser: charset: UTF-8 newline: CRLF type: csv delimiter: ',' quote: '"' escape: '"' trim_if_not_quoted: false skip_header_lines: 1 allow_extra_columns: false allow_optional_columns: false columns: - {name: Sepal_Length, type: double} - {name: Sepal_Width, type: double} - {name: Petal_Length, type: double} - {name: Petal_Width, type: double} - {name: Species, type: string} out: {type: postgresql, host: localhost, user: postgres, password: xxxx, database: postgres, table: iris, mode: insert}
データのロードを実行します。
$ embulk run config.yml 2016-04-25 16:19:01.866 +0900: Embulk v0.8.8 2016-04-25 16:19:03.537 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-postgresql (0.5.1) 2016-04-25 16:19:03.580 +0900 [INFO] (0001:transaction): Listing local files at directory '.' filtering filename by prefix 'IRIS.csv' 2016-04-25 16:19:03.587 +0900 [INFO] (0001:transaction): Loading files [IRIS.csv] 2016-04-25 16:19:03.649 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4 2016-04-25 16:19:03.674 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=postgres, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-04-25 16:19:03.764 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public" 2016-04-25 16:19:03.766 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-04-25 16:19:03.766 +0900 [INFO] (0001:transaction): Using insert mode 2016-04-25 16:19:03.785 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "iris_71dc4e71a1b5980_bl_tmp000" 2016-04-25 16:19:03.786 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-04-25 16:19:03.796 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "iris_71dc4e71a1b5980_bl_tmp000" ("Sepal_Length" DOUBLE PRECISION, "Sepal_Width" DOUBLE PRECISION, "Petal_Length" DOUBLE PRECISION, "Petal_Width" DOUBLE PRECISION, "Species" TEXT) 2016-04-25 16:19:03.812 +0900 [INFO] (0001:transaction): > 0.02 seconds 2016-04-25 16:19:03.814 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "iris_71dc4e71a1b5980_bl_tmp001" 2016-04-25 16:19:03.815 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-04-25 16:19:03.817 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "iris_71dc4e71a1b5980_bl_tmp001" ("Sepal_Length" DOUBLE PRECISION, "Sepal_Width" DOUBLE PRECISION, "Petal_Length" DOUBLE PRECISION, "Petal_Width" DOUBLE PRECISION, "Species" TEXT) 2016-04-25 16:19:03.824 +0900 [INFO] (0001:transaction): > 0.01 seconds 2016-04-25 16:19:03.824 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "iris_71dc4e71a1b5980_bl_tmp002" 2016-04-25 16:19:03.825 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-04-25 16:19:03.828 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "iris_71dc4e71a1b5980_bl_tmp002" ("Sepal_Length" DOUBLE PRECISION, "Sepal_Width" DOUBLE PRECISION, "Petal_Length" DOUBLE PRECISION, "Petal_Width" DOUBLE PRECISION, "Species" TEXT) 2016-04-25 16:19:03.833 +0900 [INFO] (0001:transaction): > 0.01 seconds 2016-04-25 16:19:03.834 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "iris_71dc4e71a1b5980_bl_tmp003" 2016-04-25 16:19:03.835 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-04-25 16:19:03.837 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "iris_71dc4e71a1b5980_bl_tmp003" ("Sepal_Length" DOUBLE PRECISION, "Sepal_Width" DOUBLE PRECISION, "Petal_Length" DOUBLE PRECISION, "Petal_Width" DOUBLE PRECISION, "Species" TEXT) 2016-04-25 16:19:03.841 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-04-25 16:19:03.895 +0900 [INFO] (0001:transaction): {done: 0 / 1, running: 0} 2016-04-25 16:19:03.931 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=postgres, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-04-25 16:19:03.948 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public" 2016-04-25 16:19:03.949 +0900 [INFO] (0017:task-0000): > 0.00 seconds 2016-04-25 16:19:03.950 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "iris_71dc4e71a1b5980_bl_tmp000" ("Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Species") FROM STDIN 2016-04-25 16:19:03.955 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=postgres, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-04-25 16:19:03.967 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public" 2016-04-25 16:19:03.968 +0900 [INFO] (0017:task-0000): > 0.00 seconds 2016-04-25 16:19:03.968 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "iris_71dc4e71a1b5980_bl_tmp001" ("Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Species") FROM STDIN 2016-04-25 16:19:03.970 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=postgres, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-04-25 16:19:03.983 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public" 2016-04-25 16:19:03.984 +0900 [INFO] (0017:task-0000): > 0.00 seconds 2016-04-25 16:19:03.984 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "iris_71dc4e71a1b5980_bl_tmp002" ("Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Species") FROM STDIN 2016-04-25 16:19:03.986 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=postgres, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-04-25 16:19:03.998 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public" 2016-04-25 16:19:04.000 +0900 [INFO] (0017:task-0000): > 0.00 seconds 2016-04-25 16:19:04.000 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "iris_71dc4e71a1b5980_bl_tmp003" ("Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Species") FROM STDIN 2016-04-25 16:19:04.061 +0900 [INFO] (0017:task-0000): Loading 150 rows (3,800 bytes) 2016-04-25 16:19:04.073 +0900 [INFO] (0017:task-0000): > 0.01 seconds (loaded 150 rows in total) 2016-04-25 16:19:04.076 +0900 [INFO] (0001:transaction): {done: 1 / 1, running: 0} 2016-04-25 16:19:04.077 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=postgres, tcpKeepAlive=true, loginTimeout=300, socketTimeout=28800} 2016-04-25 16:19:04.088 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public" 2016-04-25 16:19:04.089 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-04-25 16:19:04.091 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "iris" ("Sepal_Length" DOUBLE PRECISION, "Sepal_Width" DOUBLE PRECISION, "Petal_Length" DOUBLE PRECISION, "Petal_Width" DOUBLE PRECISION, "Species" TEXT) 2016-04-25 16:19:04.104 +0900 [INFO] (0001:transaction): > 0.01 seconds 2016-04-25 16:19:04.106 +0900 [INFO] (0001:transaction): SQL: INSERT INTO "iris" ("Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Species") SELECT "Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Species" FROM "iris_71dc4e71a1b5980_bl_tmp000" UNION ALL SELECT "Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Species" FROM "iris_71dc4e71a1b5980_bl_tmp001" UNION ALL SELECT "Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Species" FROM "iris_71dc4e71a1b5980_bl_tmp002" UNION ALL SELECT "Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Species" FROM "iris_71dc4e71a1b5980_bl_tmp003" 2016-04-25 16:19:04.113 +0900 [INFO] (0001:transaction): > 0.01 seconds (150 rows) 2016-04-25 16:19:04.125 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=postgres, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-04-25 16:19:04.136 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public" 2016-04-25 16:19:04.137 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-04-25 16:19:04.137 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "iris_71dc4e71a1b5980_bl_tmp000" 2016-04-25 16:19:04.144 +0900 [INFO] (0001:transaction): > 0.01 seconds 2016-04-25 16:19:04.144 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "iris_71dc4e71a1b5980_bl_tmp001" 2016-04-25 16:19:04.147 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-04-25 16:19:04.147 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "iris_71dc4e71a1b5980_bl_tmp002" 2016-04-25 16:19:04.150 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-04-25 16:19:04.151 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "iris_71dc4e71a1b5980_bl_tmp003" 2016-04-25 16:19:04.156 +0900 [INFO] (0001:transaction): > 0.01 seconds 2016-04-25 16:19:04.160 +0900 [INFO] (main): Committed. 2016-04-25 16:19:04.160 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"IRIS.csv"},"out":{}}
無事にデータがデータベースにロードされました。テーブルの中身を確認します。
各列のデータ型を確認します。
CREATE TABLE public.iris ( "Sepal_Length" double precision, "Sepal_Width" double precision, "Petal_Length" double precision, "Petal_Width" double precision, "Species" text )
データのロードに成功しました。
日本語のデータセットを試す
次は日本語を含むデータで試してみます。オープンデータとして、下記のサイトからダウンロードできるCSVファイルを使うことにしました。
ダウンロードできるデータはいくつか種類がありますが、今回は「駅データ」を使用しました。データの中身はこのようになっています。
$ cat station20160401free.csv station_cd,station_g_cd,station_name,station_name_k,station_name_r,line_cd,pref_cd,post,add,lon,lat,open_ymd,close_ymd,e_status,e_sort 1110101,1110101,函館,,,11101,1,040-0063,北海道函館市若松町12-13,140.726413,41.773709,1902-12-10,,0,1110101 1110102,1110102,五稜郭,,,11101,1,041-0813,函館市亀田本町,140.733539,41.803557,,,0,1110102 1110103,1110103,桔梗,,,11101,1,041-1210,北海道函館市桔梗3丁目41-36,140.722952,41.846457,1902-12-10,,0,1110103 1110104,1110104,大中山,,,11101,1,041-1121,亀田郡七飯町大字大中山,140.71358,41.864641,,,0,1110104 1110105,1110105,七飯,,,11101,1,041-1111,亀田郡七飯町字本町,140.688556,41.886971,,,0,1110105
先ほどと同様に、guess機能に対する設定をguess.ymlファイルに記述しておきます。(テキストエディタで作成します)
in: type: file path_prefix: "./station20160401free.csv" out: type: postgresql host: localhost user: postgres password: xxxx database: postgres table: station mode: insert
guessコマンドを実行すると、推定した結果がconfig.ymlファイルに保存されます。
$ embulk guess ./guess.yml -o config.yml 2016-04-25 16:55:09.781 +0900: Embulk v0.8.8 2016-04-25 16:55:10.655 +0900 [INFO] (0001:guess): Listing local files at directory '.' filtering filename by prefix 'station20160401free.csv' 2016-04-25 16:55:10.659 +0900 [INFO] (0001:guess): Loading files [station20160401free.csv] 2016-04-25 16:55:10.707 +0900 [INFO] (0001:guess): Loaded plugin embulk/guess/gzip from a load path 2016-04-25 16:55:10.720 +0900 [INFO] (0001:guess): Loaded plugin embulk/guess/bzip2 from a load path 2016-04-25 16:55:10.734 +0900 [INFO] (0001:guess): Loaded plugin embulk/guess/json from a load path 2016-04-25 16:55:10.738 +0900 [INFO] (0001:guess): Loaded plugin embulk/guess/csv from a load path in: type: file path_prefix: ./station20160401free.csv parser: charset: UTF-8 newline: CRLF type: csv delimiter: ',' quote: '"' escape: '"' trim_if_not_quoted: false skip_header_lines: 1 allow_extra_columns: false allow_optional_columns: false columns: - {name: station_cd, type: long} - {name: station_g_cd, type: long} - {name: station_name, type: string} - {name: station_name_k, type: string} - {name: station_name_r, type: string} - {name: line_cd, type: long} - {name: pref_cd, type: long} - {name: post, type: string} - {name: add, type: string} - {name: lon, type: string} - {name: lat, type: string} - {name: open_ymd, type: timestamp, format: '%Y-%m-%d'} - {name: close_ymd, type: timestamp, format: '%Y-%m-%d'} - {name: e_status, type: long} - {name: e_sort, type: long} out: {type: postgresql, host: localhost, user: postgres, password: xxxx, database: postgres, table: station, mode: insert} Created 'config.yml' file.
config.ymlの内容は、次のようになっています。
$ cat config.yml in: type: file path_prefix: ./station20160401free.csv parser: charset: UTF-8 newline: CRLF type: csv delimiter: ',' quote: '"' escape: '"' trim_if_not_quoted: false skip_header_lines: 1 allow_extra_columns: false allow_optional_columns: false columns: - {name: station_cd, type: long} - {name: station_g_cd, type: long} - {name: station_name, type: string} - {name: station_name_k, type: string} - {name: station_name_r, type: string} - {name: line_cd, type: long} - {name: pref_cd, type: long} - {name: post, type: string} - {name: add, type: string} - {name: lon, type: string} - {name: lat, type: string} - {name: open_ymd, type: timestamp, format: '%Y-%m-%d'} - {name: close_ymd, type: timestamp, format: '%Y-%m-%d'} - {name: e_status, type: long} - {name: e_sort, type: long} out: {type: postgresql, host: localhost, user: postgres, password: xxxx, database: postgres, table: station, mode: insert}
データのロードを実行します。
$ embulk run config.yml 2016-04-25 16:58:42.047 +0900: Embulk v0.8.8 2016-04-25 16:58:43.584 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-postgresql (0.5.1) 2016-04-25 16:58:43.655 +0900 [INFO] (0001:transaction): Listing local files at directory '.' filtering filename by prefix 'station20160401free.csv' 2016-04-25 16:58:43.660 +0900 [INFO] (0001:transaction): Loading files [station20160401free.csv] 2016-04-25 16:58:43.742 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4 2016-04-25 16:58:43.759 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=postgres, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-04-25 16:58:43.861 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public" 2016-04-25 16:58:43.863 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-04-25 16:58:43.864 +0900 [INFO] (0001:transaction): Using insert mode 2016-04-25 16:58:43.880 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "station_ce331f972880_bl_tmp000" 2016-04-25 16:58:43.882 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-04-25 16:58:43.894 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "station_ce331f972880_bl_tmp000" ("station_cd" BIGINT, "station_g_cd" BIGINT, "station_name" TEXT, "station_name_k" TEXT, "station_name_r" TEXT, "line_cd" BIGINT, "pref_cd" BIGINT, "post" TEXT, "add" TEXT, "lon" TEXT, "lat" TEXT, "open_ymd" TIMESTAMP WITH TIME ZONE, "close_ymd" TIMESTAMP WITH TIME ZONE, "e_status" BIGINT, "e_sort" BIGINT) 2016-04-25 16:58:43.909 +0900 [INFO] (0001:transaction): > 0.02 seconds 2016-04-25 16:58:43.910 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "station_ce331f972880_bl_tmp001" 2016-04-25 16:58:43.911 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-04-25 16:58:43.916 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "station_ce331f972880_bl_tmp001" ("station_cd" BIGINT, "station_g_cd" BIGINT, "station_name" TEXT, "station_name_k" TEXT, "station_name_r" TEXT, "line_cd" BIGINT, "pref_cd" BIGINT, "post" TEXT, "add" TEXT, "lon" TEXT, "lat" TEXT, "open_ymd" TIMESTAMP WITH TIME ZONE, "close_ymd" TIMESTAMP WITH TIME ZONE, "e_status" BIGINT, "e_sort" BIGINT) 2016-04-25 16:58:43.923 +0900 [INFO] (0001:transaction): > 0.01 seconds 2016-04-25 16:58:43.924 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "station_ce331f972880_bl_tmp002" 2016-04-25 16:58:43.924 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-04-25 16:58:43.929 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "station_ce331f972880_bl_tmp002" ("station_cd" BIGINT, "station_g_cd" BIGINT, "station_name" TEXT, "station_name_k" TEXT, "station_name_r" TEXT, "line_cd" BIGINT, "pref_cd" BIGINT, "post" TEXT, "add" TEXT, "lon" TEXT, "lat" TEXT, "open_ymd" TIMESTAMP WITH TIME ZONE, "close_ymd" TIMESTAMP WITH TIME ZONE, "e_status" BIGINT, "e_sort" BIGINT) 2016-04-25 16:58:43.933 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-04-25 16:58:43.933 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "station_ce331f972880_bl_tmp003" 2016-04-25 16:58:43.934 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-04-25 16:58:43.939 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "station_ce331f972880_bl_tmp003" ("station_cd" BIGINT, "station_g_cd" BIGINT, "station_name" TEXT, "station_name_k" TEXT, "station_name_r" TEXT, "line_cd" BIGINT, "pref_cd" BIGINT, "post" TEXT, "add" TEXT, "lon" TEXT, "lat" TEXT, "open_ymd" TIMESTAMP WITH TIME ZONE, "close_ymd" TIMESTAMP WITH TIME ZONE, "e_status" BIGINT, "e_sort" BIGINT) 2016-04-25 16:58:43.948 +0900 [INFO] (0001:transaction): > 0.01 seconds 2016-04-25 16:58:44.020 +0900 [INFO] (0001:transaction): {done: 0 / 1, running: 0} 2016-04-25 16:58:44.047 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=postgres, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-04-25 16:58:44.067 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public" 2016-04-25 16:58:44.068 +0900 [INFO] (0017:task-0000): > 0.00 seconds 2016-04-25 16:58:44.069 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "station_ce331f972880_bl_tmp000" ("station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status", "e_sort") FROM STDIN 2016-04-25 16:58:44.075 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=postgres, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-04-25 16:58:44.090 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public" 2016-04-25 16:58:44.091 +0900 [INFO] (0017:task-0000): > 0.00 seconds 2016-04-25 16:58:44.091 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "station_ce331f972880_bl_tmp001" ("station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status", "e_sort") FROM STDIN 2016-04-25 16:58:44.093 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=postgres, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-04-25 16:58:44.104 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public" 2016-04-25 16:58:44.105 +0900 [INFO] (0017:task-0000): > 0.00 seconds 2016-04-25 16:58:44.105 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "station_ce331f972880_bl_tmp002" ("station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status", "e_sort") FROM STDIN 2016-04-25 16:58:44.107 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=postgres, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-04-25 16:58:44.123 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public" 2016-04-25 16:58:44.124 +0900 [INFO] (0017:task-0000): > 0.00 seconds 2016-04-25 16:58:44.124 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "station_ce331f972880_bl_tmp003" ("station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status", "e_sort") FROM STDIN 2016-04-25 16:58:45.008 +0900 [INFO] (0017:task-0000): Loading 2,723 rows (320,762 bytes) 2016-04-25 16:58:45.053 +0900 [INFO] (0017:task-0000): > 0.05 seconds (loaded 2,723 rows in total) 2016-04-25 16:58:45.056 +0900 [INFO] (0017:task-0000): Loading 2,720 rows (320,300 bytes) 2016-04-25 16:58:45.084 +0900 [INFO] (0017:task-0000): > 0.03 seconds (loaded 2,720 rows in total) 2016-04-25 16:58:45.086 +0900 [INFO] (0017:task-0000): Loading 2,757 rows (322,460 bytes) 2016-04-25 16:58:45.112 +0900 [INFO] (0017:task-0000): > 0.03 seconds (loaded 2,757 rows in total) 2016-04-25 16:58:45.113 +0900 [INFO] (0017:task-0000): Loading 2,634 rows (310,929 bytes) 2016-04-25 16:58:45.141 +0900 [INFO] (0017:task-0000): > 0.03 seconds (loaded 2,634 rows in total) 2016-04-25 16:58:45.146 +0900 [INFO] (0001:transaction): {done: 1 / 1, running: 0} 2016-04-25 16:58:45.148 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=postgres, tcpKeepAlive=true, loginTimeout=300, socketTimeout=28800} 2016-04-25 16:58:45.161 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public" 2016-04-25 16:58:45.162 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-04-25 16:58:45.167 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "station" ("station_cd" BIGINT, "station_g_cd" BIGINT, "station_name" TEXT, "station_name_k" TEXT, "station_name_r" TEXT, "line_cd" BIGINT, "pref_cd" BIGINT, "post" TEXT, "add" TEXT, "lon" TEXT, "lat" TEXT, "open_ymd" TIMESTAMP WITH TIME ZONE, "close_ymd" TIMESTAMP WITH TIME ZONE, "e_status" BIGINT, "e_sort" BIGINT) 2016-04-25 16:58:45.179 +0900 [INFO] (0001:transaction): > 0.01 seconds 2016-04-25 16:58:45.181 +0900 [INFO] (0001:transaction): SQL: INSERT INTO "station" ("station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status", "e_sort") SELECT "station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status", "e_sort" FROM "station_ce331f972880_bl_tmp000" UNION ALL SELECT "station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status", "e_sort" FROM "station_ce331f972880_bl_tmp001" UNION ALL SELECT "station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status", "e_sort" FROM "station_ce331f972880_bl_tmp002" UNION ALL SELECT "station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status", "e_sort" FROM "station_ce331f972880_bl_tmp003" 2016-04-25 16:58:45.227 +0900 [INFO] (0001:transaction): > 0.05 seconds (10,834 rows) 2016-04-25 16:58:45.247 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=postgres, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-04-25 16:58:45.257 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public" 2016-04-25 16:58:45.258 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-04-25 16:58:45.258 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "station_ce331f972880_bl_tmp000" 2016-04-25 16:58:45.271 +0900 [INFO] (0001:transaction): > 0.01 seconds 2016-04-25 16:58:45.271 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "station_ce331f972880_bl_tmp001" 2016-04-25 16:58:45.277 +0900 [INFO] (0001:transaction): > 0.01 seconds 2016-04-25 16:58:45.277 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "station_ce331f972880_bl_tmp002" 2016-04-25 16:58:45.282 +0900 [INFO] (0001:transaction): > 0.01 seconds 2016-04-25 16:58:45.282 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "station_ce331f972880_bl_tmp003" 2016-04-25 16:58:45.289 +0900 [INFO] (0001:transaction): > 0.01 seconds 2016-04-25 16:58:45.292 +0900 [INFO] (main): Committed. 2016-04-25 16:58:45.292 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"station20160401free.csv"},"out":{}}
無事にデータがデータベースにロードされました。テーブルの中身を確認します。
各列のデータ型を確認します。
CREATE TABLE public.station ( station_cd bigint, station_g_cd bigint, station_name text, station_name_k text, station_name_r text, line_cd bigint, pref_cd bigint, post text, add text, lon text, lat text, open_ymd timestamp with time zone, close_ymd timestamp with time zone, e_status bigint, e_sort bigint )
データのロードに成功しました。緯度(lat)、経度(lon)の値はtext型として読み込まれていますね。
まとめ
Embulkの読み込んだデータのレイアウトを推測する「guess」という機能を、実際に試してみました。 データファイルの中身を自動的に推測してくれるので、データのハンドリング作業が効率化できそうです。
次回
次回は、出力先データベースにRedshiftを使って実験してみたいと思います。